--Message queue implemented by PG
--@Author Anthony Chen 
--@Copyright reserved
--2016-12-30

CREATE TABLE mq_config (
	table_name name not null unique,
	channel name primary key
);


CREATE TABLE mq_base (
	msg_id bigserial not null,
	type_id bigint not null default 0,	
	sent boolean not null default false ,
	sent_at timestamp with time zone not null default now(),
	payload jsonb not null,	
	delivered_at timestamp with time zone
	,CONSTRAINT pk_mq_base PRIMARY KEY (msg_id)
);

--Use of this index SET "enable_seqscan = OFF" should be executed to avoid seq scan
CREATE UNIQUE INDEX idx_mq_base_not_sent
  ON mq_base USING btree (msg_id,type_id) WHERE sent = false;


CREATE OR REPLACE FUNCTION mq_trigger_notify() RETURNS TRIGGER
LANGUAGE PLPGSQL AS
$$
DECLARE t_channel name;
BEGIN
	SELECT channel INTO t_channel FROM mq_config 
	WHERE table_name = TG_RELNAME;

	EXECUTE 'NOTIFY ' || quote_ident(t_channel) || ', ' 
	|| quote_literal(NEW.msg_id);
	RETURN NEW;
END;
$$;


CREATE OR REPLACE FUNCTION mq_create_queue
(in_channel text)
RETURNS mq_config 
LANGUAGE PLPGSQL VOLATILE SECURITY DEFINER AS $$

DECLARE 
out_val mq_config%ROWTYPE;
t_table_name name;
BEGIN

	t_table_name := 'mq_queue_' || in_channel;

	INSERT INTO mq_config (table_name, channel)
	VALUES (t_table_name, in_channel) returning * into out_val;

	EXECUTE 'CREATE TABLE ' || quote_ident(t_table_name) || '(
		like mq_base INCLUDING ALL )';

	EXECUTE ' GRANT ALL ON TABLE '||quote_ident(t_table_name) || '  TO public';

	EXECUTE 'CREATE TRIGGER tr_mq_notify
	AFTER INSERT ON ' || quote_ident(t_table_name) || '
	FOR EACH ROW EXECUTE PROCEDURE mq_trigger_notify()';

	RETURN out_val;

END;
$$;

REVOKE EXECUTE ON FUNCTION mq_create_queue(text) FROM public;

CREATE OR REPLACE FUNCTION mq_drop_queue(in_channel name) RETURNS bool
LANGUAGE plpgsql VOLATILE SECURITY DEFINER  AS $$

declare t_table_name name;

BEGIN

	SELECT table_name INTO t_table_name FROM mq_config 
	WHERE channel = in_channel;

	EXECUTE 'DROP TABLE ' || quote_ident(t_table_name) || ' CASCADE';

	DELETE FROM mq_config WHERE channel = in_channel;

	RETURN FOUND;

END;
$$;

REVOKE EXECUTE ON FUNCTION mq_drop_queue(in_channel name) FROM public;

CREATE OR REPLACE FUNCTION mq_send
(in_channel text, in_payload jsonb,in_type bigint DEFAULT 0 )
RETURNS mq_base
LANGUAGE PLPGSQL VOLATILE
AS $$
DECLARE channel_entry mq_config%ROWTYPE;
out_val mq_base%ROWTYPE;
BEGIN
	SELECT * INTO channel_entry FROM mq_config
	WHERE channel = in_channel;
	IF NOT FOUND THEN
		RAISE EXCEPTION 'Channel Not Found';
	END IF;

	EXECUTE 'INSERT INTO ' || quote_ident(channel_entry.table_name)
	|| ' (type_id, payload) VALUES ( '
		|| in_type ||','
		|| quote_literal(in_payload) || '::jsonb )
	RETURNING msg_id,type_id,sent, sent_at,payload, delivered_at'
	INTO out_val ;
	RETURN out_val;
END;
$$;

CREATE OR REPLACE FUNCTION mq_recv(
	in_channel name,  --Channel Name
	in_num_msgs bigint DEFAULT 1, --Number of message to receive 
	in_type_id bigint DEFAULT NULL::bigint --Message type id
)
RETURNS SETOF mq_base LANGUAGE PLPGSQL VOLATILE
AS $$
DECLARE channel_entry mq_config%ROWTYPE;

BEGIN
	SELECT * INTO channel_entry FROM mq_config
	WHERE channel = in_channel;
	IF in_type_id is NULL THEN
		RETURN QUERY EXECUTE
		$e$ UPDATE $e$ || quote_ident(channel_entry.table_name) || $e$
		SET delivered_at = now(),sent=true
		WHERE msg_id IN (SELECT msg_id 
			FROM $e$ || quote_ident(channel_entry.table_name) || 
			$e$    WHERE sent=false  
			LIMIT $e$ || in_num_msgs || $e$
			FOR UPDATE SKIP LOCKED )
			RETURNING msg_id, type_id ,sent ,sent_at, payload, delivered_at $e$;
	ELSE
		RETURN QUERY EXECUTE
		$e$ UPDATE $e$ || quote_ident(channel_entry.table_name) || $e$
		SET delivered_at = now(),sent=true
		WHERE msg_id IN (SELECT msg_id 
			FROM $e$ || quote_ident(channel_entry.table_name) || 
			$e$    WHERE sent=false  and type_id= $e$ || in_type_id ||  $e$
			LIMIT $e$ || in_num_msgs || $e$
			FOR UPDATE SKIP LOCKED
			)
			RETURNING msg_id, type_id ,sent ,sent_at, payload, delivered_at $e$;
	END IF;
END;
$$;

CREATE OR REPLACE FUNCTION mq_recv(
	in_channel name,  --Channel Name
	in_type_rs text,  --SQL represent type id set
	in_num_msgs bigint DEFAULT 1 --Number of message to receive 
)
RETURNS SETOF mq_base LANGUAGE PLPGSQL VOLATILE
AS $$
DECLARE channel_entry mq_config%ROWTYPE;

BEGIN
	SELECT * INTO channel_entry FROM mq_config
	WHERE channel = in_channel;

	RETURN QUERY EXECUTE
	$e$ UPDATE $e$ || quote_ident(channel_entry.table_name) || $e$
	SET delivered_at = now(),sent=true
	WHERE msg_id IN (SELECT msg_id 
		FROM $e$ || quote_ident(channel_entry.table_name) || 
		$e$    WHERE sent=false  and type_id in $e$ || in_type_rs ||  $e$
		LIMIT $e$ || in_num_msgs || $e$
		FOR UPDATE SKIP LOCKED
		)
		RETURNING msg_id, type_id ,sent ,sent_at, payload, delivered_at $e$;
END;
$$;

